package com.km.algorithm

/**
  * Created by lenovo on 2017/4/18.
  */

import org.apache.spark.SparkContext._
import org.apache.spark.{SparkConf, SparkContext}

/**
  * Created by LiuShifeng on 2015/7/8.
  */
object TTest {
  def main(args: Array[String]) {
    if (args.length < 3) {
      System.out.println("Error!")
      System.exit(1)
    }

    val positiveFilePath = "C:\\Users\\lenovo\\Desktop\\a.txt";
    val negativeFilePath = "C:\\Users\\lenovo\\Desktop\\b.txt";
    val position = 0.05.toInt

    val sparkConf = new SparkConf().setAppName("TTest").setMaster("local[8]")
    val sc = new SparkContext(sparkConf)

    val positiveRDD = sc.textFile(positiveFilePath).map(x => {
      val s = x.split(",")
      val l = s.length
      val value = new Array[Double](l - position)
      val num = new Array[Int](l - position)
      for (i <- position until l) {
        if (s(i).toDouble > 0) {
          value(i - position) = math.log(s(i).toDouble)
          num(i - position) = 1
        }
      }
      var key = "positive"
      (key, (value, num))
    })
    //    println(positiveRDD.first())
    val negativeRDD = sc.textFile(negativeFilePath).map(x => {
      val s = x.split(",")
      val l = s.length
      val value = new Array[Double](l - position)
      val num = new Array[Int](l - position)
      for (i <- position until l) {
        if (s(i).toDouble > 0) {
          value(i - position) = math.log(s(i).toDouble)
          num(i - position) = 1
        }
      }
      var key = "negative"
      (key, (value, num))
    })
    //    println(negativeRDD.first())

    val positiveInterestHierachySimMeanNum = positiveRDD.map(x => {
      ("positiveMean", x._2)
    }).reduceByKey((x, y) => {
      val l = x._1.length
      val value = new Array[Double](l)
      val num = new Array[Int](l)
      for (i <- 0 until l) {
        value(i) = x._1(i) + y._1(i)
        num(i) = x._2(i) + y._2(i)
      }
      (value, num)
    }).map(x => {
      val l = x._2._1.length
      val result = new Array[Double](l)
      for (i <- 0 until l) {
        result(i) = x._2._1(i) / x._2._2(i)
      }
      (result, x._2._2)
    }).first()
    val positiveInterestHierachySimMean = positiveInterestHierachySimMeanNum._1
    val positiveInterestHierachySimNum = positiveInterestHierachySimMeanNum._2
    println("positiveInterestHierachySimMean  " + positiveInterestHierachySimMean.mkString(","))
    println("positiveInterestHierachySimNum " + positiveInterestHierachySimNum.mkString(","))

    val negativeInterestHierachySimMeanNum = negativeRDD.map(x => {
      ("negativeMean", x._2)
    }).reduceByKey((x, y) => {
      val l = x._1.length
      val value = new Array[Double](l)
      val num = new Array[Int](l)
      for (i <- 0 until l) {
        value(i) = x._1(i) + y._1(i)
        num(i) = x._2(i) + y._2(i)
      }
      (value, num)
    }).map(x => {
      val l = x._2._1.length
      val result = new Array[Double](l)
      for (i <- 0 until l) {
        result(i) = x._2._1(i) / x._2._2(i)
      }
      (result, x._2._2)
    }).first()
    val negativeInterestHierachySimMean = negativeInterestHierachySimMeanNum._1
    val negativeInterestHierachySimNum = negativeInterestHierachySimMeanNum._2
    println("negativeInterestHierachySimMean  " + negativeInterestHierachySimMean.mkString(","))
    println("negativeInterestHierachySimNum " + negativeInterestHierachySimNum.mkString(","))

    val positiveInterestHierachySimVariance = positiveRDD.map(x => {
      val l = x._2._1.length
      val value = new Array[Double](l)
      for (i <- 0 until l) {
        value(i) = math.pow(x._2._1(i) - positiveInterestHierachySimMean(i), 2)
      }
      ("positiveVariance", (value, x._2._2))
    }).reduceByKey((x, y) => {
      val l = x._1.length
      val value = new Array[Double](l)
      val num = new Array[Int](l)
      for (i <- 0 until l) {
        value(i) = x._1(i) + y._1(i)
        num(i) = x._2(i) + y._2(i)
      }
      (value, num)
    }).map(x => {
      val l = x._2._1.length
      val result = new Array[Double](l)
      for (i <- 0 until l) {
        result(i) = x._2._1(i) / x._2._2(i)
      }
      (result)
    }).first()
    println("positiveInterestHierachySimVariance  " + positiveInterestHierachySimVariance.mkString(","))

    val negativeInterestHierachySimVariance = negativeRDD.map(x => {
      val l = x._2._1.length
      val value = new Array[Double](l)
      for (i <- 0 until l) {
        value(i) = math.pow(x._2._1(i) - positiveInterestHierachySimMean(i), 2)
      }
      ("negativeVariance", (value, x._2._2))
    }).reduceByKey((x, y) => {
      val l = x._1.length
      val value = new Array[Double](l)
      val num = new Array[Int](l)
      for (i <- 0 until l) {
        value(i) = x._1(i) + y._1(i)
        num(i) = x._2(i) + y._2(i)
      }
      (value, num)
    }).map(x => {
      val l = x._2._1.length
      val result = new Array[Double](l)
      for (i <- 0 until l) {
        result(i) = x._2._1(i) / x._2._2(i)
      }
      (result)
    }).first()
    println("negativeInterestHierachySimVariance  " + negativeInterestHierachySimVariance.mkString(","))

    val length = positiveInterestHierachySimMean.length
    val tTest = new Array[Double](length)
    for (i <- 0 until length) {
      tTest(i) = tScore(positiveInterestHierachySimMean(i), negativeInterestHierachySimMean(i), positiveInterestHierachySimVariance(i), negativeInterestHierachySimVariance(i), positiveInterestHierachySimNum(i), negativeInterestHierachySimNum(i))
    }
    println("tTest " + tTest.mkString(","))

    val df = new Array[Double](length)
    for (i <- 0 until length) {
      df(i) = degreeOfFreedom(positiveInterestHierachySimVariance(i), negativeInterestHierachySimVariance(i), positiveInterestHierachySimNum(i), negativeInterestHierachySimNum(i))
    }
    println("df " + df.mkString(","))

    sc.stop()
  }

  def tScore(Xa: Double, Xb: Double, Va: Double, Vb: Double, Na: Long, Nb: Long): Double = {
    (Xa - Xb) / (math.sqrt(Va / Na + Vb / Nb))
  }

  def degreeOfFreedom(Va: Double, Vb: Double, Na: Long, Nb: Long): Double = {
    math.pow((Va / Na + Va / Nb), 2) / (math.pow(Va / Na, 2) / (Na - 1) + math.pow(Vb / Nb, 2) / (Nb - 1))
  }
}